Implement row group skipping for the default engine parquet readers#362
Implement row group skipping for the default engine parquet readers#362scovich merged 33 commits intodelta-io:mainfrom
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #362 +/- ##
==========================================
+ Coverage 77.06% 77.66% +0.59%
==========================================
Files 47 49 +2
Lines 9524 10079 +555
Branches 9524 10079 +555
==========================================
+ Hits 7340 7828 +488
- Misses 1790 1805 +15
- Partials 394 446 +52 ☔ View full report in Codecov by Sentry. |
| engine, | ||
| commit_read_schema, | ||
| checkpoint_read_schema, | ||
| self.predicate.clone(), |
There was a problem hiding this comment.
NOTE: This was an existing bug -- passing a query-level filter to the metadata file reads.
(We probably should add a meta-skipping unit test for replay?)
There was a problem hiding this comment.
Already done -- #381 added three new replay_for_XXX tests, which this PR updates to account for the expected row group skipping.
| .map_ok(|batch| (batch, true)); | ||
|
|
||
| let parquet_client = engine.get_parquet_handler(); | ||
| // TODO change predicate to: predicate AND add.path not null |
There was a problem hiding this comment.
I removed these two TODO because the P&M query also invokes this code path, and filtering by add.path NOT NULL is just plain incorrect. Also, see the code comment at the other replay call site for why it doesn't make sense to pass add.path IS NOT NULL as a row group skipping filter.
nicklan
left a comment
There was a problem hiding this comment.
Looks great, thanks. Just a couple of small nits
| Some(self.get_stats(col)?.null_count_opt()? as i64) | ||
| let nullcount = match self.get_stats(col) { | ||
| Some(s) => s.null_count_opt()? as i64, | ||
| None => self.get_rowcount_stat_value(), |
There was a problem hiding this comment.
This is a new find, exposed on accident by me hacking two more parts into the checkpoint so we could test transaction app id filtering (the "checkpoint" schema was truncated, which prevented the P&M query from skipping those parts)
There was a problem hiding this comment.
If the statistics() method on ColumnChunkMetadata returns None, that just means that there are no stats for that column, but doesn't necessarily imply that all values are null does it?
There was a problem hiding this comment.
Oh, good catch. I didn't put the check deep enough. There are three levels of None here:
- The column chunk doesn't even exist (infer nullcount = rowcount)
- The column chunk doesn't have stats (should not infer anything clever)
- The stats object lacks a particular stat
To make things even more "fun", we have the following warning in Statistics::null_count_opt 🤦:
this API returns Some(0) even if the null count was not present in the statistics
So I have two problems to work around now.
| // txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..." | ||
| // txn.version INT64 0 "4390" / "4390" | ||
| #[test] | ||
| fn test_replay_for_metadata() { |
There was a problem hiding this comment.
An accidentally clever test :P
(see other comment)
| // checkpoint part when patitioned by `add.path` like the Delta spec requires. There's no | ||
| // point filtering by a particular app id, even if we have one, because people usually query | ||
| // for app ids that exist. | ||
| let meta_predicate = Expr::column("txn.appId").is_not_null(); |
There was a problem hiding this comment.
The code didn't previously attempt row group skipping for app ids. Now it does.
zachschuermann
left a comment
There was a problem hiding this comment.
given the size of this PR i'll say this is a best-effort review but LGTM
| engine, | ||
| commit_read_schema, | ||
| checkpoint_read_schema, | ||
| self.predicate.clone(), |
nicklan
left a comment
There was a problem hiding this comment.
Had a question about correctness, but otherwise lgtm
| Some(self.get_stats(col)?.null_count_opt()? as i64) | ||
| let nullcount = match self.get_stats(col) { | ||
| Some(s) => s.null_count_opt()? as i64, | ||
| None => self.get_rowcount_stat_value(), |
There was a problem hiding this comment.
If the statistics() method on ColumnChunkMetadata returns None, that just means that there are no stats for that column, but doesn't necessarily imply that all values are null does it?
| // txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..." | ||
| // txn.version INT64 0 "4390" / "4390" | ||
| #[test] | ||
| fn test_replay_for_metadata() { |
In preparation for #362 that actually implements parquet row group skipping, here we make various preparatory changes that can stand on their own: * Plumb the predicates through to the parquet readers, so that they can easily start using them * Add and use a new `Expression::is_not_null` helper that does what it says * Factor out `replay_for_XXX` methods, so that log replay involving push-down predicates can be tested independently. * Don't involve <n>.json in log replay if <n>.checkpoint.parquet is available This should make both changes easier to review.
nicklan
left a comment
There was a problem hiding this comment.
lgtm! two small suggestions that you can take or leave as you prefer.
| !matches!(result, Some(false)) | ||
| } | ||
|
|
||
| /// Returns `None` if the column doesn't exist and `Some(None)` if the column has no stats. |
There was a problem hiding this comment.
we could define an enum for this rather than Some(None). Just a thought, I'm okay with both ways, and the Some(None) will have cleaner code in the method (at the cost of a more confusing return type)
There was a problem hiding this comment.
oh nice i think i commented on this too - maybe just Result? and we can have Err(MissingColumn) for a more understandable return type?
There was a problem hiding this comment.
I avoided Result because that would be exceptions as control flow (this isn't actually an error, it's just a situation).
If this were public code, the enum might make sense. But for a private method, I don't think it's worth the cognitive overhead (both to define and use it) when one line of code comment can fully explain what's going on?
There was a problem hiding this comment.
Yeah, I think it's fine as is since both the method and the call site have a comment.
zachschuermann
left a comment
There was a problem hiding this comment.
another stamp with a few more questions :)
There was a problem hiding this comment.
do we need to introduce testing for the E2E stats to skipping path? this test suite does all of the stats part it seems - are we relying on existing tests to make sure the remaining skipping based on these stats is correct?
There was a problem hiding this comment.
I think that's what the test in read.rs was doing before I deleted it... should I reinstate?
There was a problem hiding this comment.
do we think E2E path is covered without it? if not then I'm not opposed to one more test :)
There was a problem hiding this comment.
Actually, I think test_data_row_group_skipping should cover it pretty well? It plumbs the predicate through starting from the scan builder. The only remaining coverage would be FFI and the toy table reader -- neither of which has any predicates that they could pass. If/when those get updated to support predicates at all, they should be able to test that the predicates actually work.
Previous PR #357 implemented the logic of stats-based skipping for a parquet reader, but in abstract form that doesn't actually depend on parquet footers. With that in place, we can now wire up the kernel default parquet readers to use row group skipping.
Also fixes #380.